前言
随着spring被广泛的使用,越来越多的公共组件都慢慢的支持透明的接入spring中,包括dubbo,rabbitmq,ehcache等,我们只需要像配置普通的bean一样,就可以通过spring容器来统一维护和管理这些组件,方便应用程序的使用。下面将细述如何通过spring来整合rabbitmq。
maven依赖
<!-- spring-rabbitmq整合配置 --> |
其中spring-amqp是对rabbitmq访问抽象jar,类似于java中的JDBC标准规范,而spring-rabbit则是对这一规范的具体实现,当然如果牛逼的你觉得用的不爽,可以对spring-amqp规范重新搞一套自己的实现。在spring-rabbit工程中有依赖rabbitmq官方提供的java sdk,所以如果依赖了spring-rabbit就无需在pom.xml中指定amqp-client,除非你想换一个更高版本的。<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.3.4</version>
</dependency>
生产者配置
spring-rabbitmq-producer.xml:<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd ">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1"
port="5672" username="test" password="123456" virtual-host="/test" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- 消息发送模板-->
<rabbit:template id="defaultAmqpTemplate" connection-factory="connectionFactory" />
</beans>
消息发送服务:
public class RabbitmqService {
private AmqpTemplate defaultAmqpTemplate;
/**
*消息发送
*@param exchange
*@param routingKey
*@param data
*/
public void send(String exchange, String routingKey, byte[] data) {
defaultAmqpTemplate.convertAndSend(exchange, routingKey, data);
}
}
消费者配置
spring-rabbitmq-consumer.xml:<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit-1.3.xsd ">
<!-- 连接服务配置 -->
<rabbit:connection-factory id="connectionFactory" host="127.0.0.1"
port="5672" username="test" password="123456" virtual-host="/test" />
<rabbit:admin connection-factory="connectionFactory" />
<!-- queue litener 观察监听模式,当有消息到达时
会通知监听在对应的队列上的监听对象task-executor="taskExecutor" -->
<rabbit:listener-container connection-factory="connectionFactory" acknowledge="auto">
<rabbit:listener queue-names="ha-cluster-queue" ref="queueListener"/>
<!-- <rabbit:listener queue-names="ha-cluster-queue1" ref="queueListener1"/> -->
.....
</rabbit:listener-container>
<!-- 监听回调-消费者 -->
<bean id="queueListener" class="lpp.rabbitmq.spring.Consumer"></bean>
</beans>
lpp.rabbitmq.spring.Consumer:
在spring中提供了两种消息回调监听器MessageListener(属于spring-amqp)和 ChannelAwareMessageListener(属于spring-rabbit)。
MessageListener:public class Consumer implements MessageListener {
public void onMessage(Message message) {
byte[] data = message.getBody();
System.out.println("收到消息data=" + data);
}
}
ChannelAwareMessageListener:public class Consumer implements ChannelAwareMessageListener {
public void onMessage(Message message, Channel channel) throws Exception {
byte[] data = message.getBody();
System.out.println("收到消息data=" + data);
}
}
总结
在实际的项目中,对于exchange,queue,bind,vhost等都一律由管理员在rabbitmq管理界面中进行添加,不要在项目工程中通过代码的方式去创建,类似于数据库操作一样,在功能上线之前提前要DBA在数据库中建表,同样rabbitmq消息队列服务使用流程也是一样的,在上线之前,提工单给rabbitmq管理员,在rabbitmq管理界面中进行添加功能需要的exchange,queue,bind,分配虚拟主机,访问用户名/密码等。然后才是功能上线 ——可以将rabbitmq服务工作流程类比于数据库服务操作流程。